Antonio Coín Castro
# -- Libraries
from matplotlib import pyplot as plt
import arviz as az
import numpy as np
import pandas as pd
import pickle
import scipy
from multiprocessing import Pool
import utils
# -- Configuration
# Extensions
%load_ext autoreload
%autoreload 2
# Plotting configuration
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.rcParams['figure.figsize'] = [6, 4]
plt.style.use('arviz-darkgrid')
NCOLS = 3
def NROWS(x, ncols=NCOLS):
return np.ceil(x/ncols).astype('int')
# Randomness and reproducibility
SEED = 42
np.random.seed(SEED)
rng = np.random.default_rng(SEED)
# Floating point precision for display
np.set_printoptions(precision=3, suppress=True)
pd.set_option("display.precision", 3)
# Multiprocessing
N_CORES = 4
We consider the model
$$ Y_i = \alpha_0 + \Psi^{-1}_{X_i}(\alpha) + \varepsilon_i, $$i.e.,
$$ Y_i \sim \mathcal N\left(\alpha_0 + \sum_{j=1}^p \beta_jX_i(t_j), \ \sigma^2\right). $$The prior distributions we choose are:
\begin{align*} \pi(\alpha_0, \sigma^2) & \propto 1/\sigma^2, \\ \tau & \sim \mathscr U([0, 1]^p), \\ \beta\mid \tau, \sigma^2 & \sim \mathcal N\left(b_0, g\sigma^2\left[\mathcal X_\tau' \mathcal X_\tau + \eta \lambda_{\text{max}}(\mathcal X_\tau' \mathcal X_\tau)\right]^{-1}\right), \end{align*}Writing the parameter vector as $\theta = (\beta, \tau, \alpha_0, \sigma^2)$, the joint posterior probability is:
$$ \pi(\beta, \tau, \alpha_0, \sigma^2\mid Y) \propto \frac{|G_\tau|^{1/2}}{\sigma^{p+n+2}} \exp\left\{ -\frac{1}{2\sigma^2} \left(\|Y- \alpha_0\boldsymbol{1} - \mathcal X_\tau\beta\|^2 + \frac{1}{g}(\beta - b_0)'G_\tau(\beta - b_0) \right) \right\}. $$Hence, the log-posterior probability is:
$$ \log \pi(\beta, \tau, \alpha_0, \sigma^2\mid Y) \propto \frac{1}{2}\log |G_\tau| - (p+n+2)\log \sigma -\frac{1}{2\sigma^2} \left(\|Y-\alpha_0\boldsymbol{1} - \mathcal X_\tau\beta\|^2 + \frac{1}{g}(\beta - b_0)'G_\tau(\beta - b_0) \right). $$Note that for computational reasons we will work with $\log \sigma$ instead of $\sigma^2$, and hence the associated prior distribution is
$$ \pi(\alpha_0, \log\sigma) \propto 1. $$We generate a toy dataset with $n=100$ functional regressors $X_i(t) \sim BM$, a response variable given by a "simple" RKHS function, a value of $\alpha_0=5$ and a variance of $\sigma^2=1.5$:
$$ Y_i \sim \mathcal N\big(5 -5X_i(0.1) + 10X_i(0.8), \ 1.5\big). $$We consider a regular grid of $N=100$ points on $[0, 1]$. In addition, we center the $X_i$ so that they have zero mean when fed to the sampling algorithms.
We also generate a test dataset with $n_{\text{test}}=50$ regressors for model evaluation.
# -- Dataset generation
def brownian_kernel(s, t, sigma=1.0):
return sigma*np.minimum(s, t)
def ornstein_uhlenbeck_kernel(s, t, l=1.0):
return np.exp(-np.abs(s - t)/l)
def squared_exponential_kernel(s, t, l=0.2):
return np.exp(-np.abs(s - t)**2/(2*l**2))
n_train, n_test = 100, 50
N = 101
grid = np.linspace(0., 1., N)
beta_true = np.array([-5., 10.])
tau_true = np.array([0.1, 0.8])
alpha0_true = 5.
sigma2_true = 1.5
theta_true = np.concatenate((
beta_true, tau_true,
[alpha0_true], [sigma2_true]
))
X, Y = utils.generate_gp_dataset(
grid, brownian_kernel,
n_train, theta_true, rng=rng
)
X_test, Y_test = utils.generate_gp_dataset(
grid, brownian_kernel,
n_test, theta_true, rng=rng
)
# Standardize data
X_m = X.mean(axis=0)
X = X - X_m
X_test = X_test - X_m
utils.plot_dataset(X, Y)
We will try to recover the true model using $p=3$.
# -- Model hyperparameters
p_hat = 3
g = 5
eta = 0.1
sd_beta_init = 5
sd_alpha0_init = 10*np.abs(Y.mean()) # Grollemund et al (?)
sd_log_sigma_init = 1
# -- Names and labels
# Names of parameters
theta_names = ["β", "τ", "α0", "σ2"]
theta_names_aux = ["α0 and logσ", "logσ"]
# Grouped labels
theta_labels_grouped = [r"$\beta$", r"$\tau$", r"$\alpha_0$", r"$\sigma^2$"]
# Individual labels
theta_labels = []
for i in range(p_hat):
theta_labels.append(fr"$\beta_{i + 1}$")
for i in range(p_hat):
theta_labels.append(fr"$\tau_{i + 1}$")
theta_labels.append(theta_labels_grouped[-2])
theta_labels.append(theta_labels_grouped[-1])
# Labels for Arviz
theta_labeller = az.labels.MapLabeller(
var_name_map=dict(zip(theta_names[-2:], theta_labels_grouped[-2:])),
coord_map={"projection": dict(
zip(np.arange(p_hat), np.arange(1, p_hat + 1)))}
)
# Dimension of parameter vector
theta_ndim = len(theta_labels)
# Dimension of grouped parameter vector
theta_ndim_grouped = len(theta_names)
# -- Negative log-likelihood definition
def neg_ll(theta, X, Y):
assert len(theta) % 2 == 0
n, N = X.shape
grid = np.linspace(0., 1., N)
p = (len(theta) - 2)//2
beta = theta[:p]
tau = theta[p:2*p]
alpha0 = theta[-2]
log_sigma = theta[-1]
sigma = np.exp(log_sigma)
idx = np.abs(grid - tau[:, np.newaxis]).argmin(1)
X_tau = X[:, idx]
return -(-n*log_sigma
- np.linalg.norm(Y - alpha0 - X_tau@beta)**2/(2*sigma**2))
# -- MLE estimation
# We artificially restrict the variance to a sensible value
bounds = [(None, None)]*p_hat + [(0.0, 1.0)] * \
p_hat + [(None, None)] + [(None, np.log(2*Y.std()))]
theta_init = utils.initial_guess_random(
p_hat, sd_beta_init,
sd_alpha0_init,
sd_log_sigma_init,
rng=rng),
mle = scipy.optimize.minimize(
neg_ll,
x0=theta_init,
args=(X, Y),
bounds=bounds,
)
mle_theta = mle.x
mle_orig = np.copy(mle_theta)
mle_orig[-1] = np.exp(mle_theta[-1])**2 # Transform back to sigma^2
pd.DataFrame(zip(theta_labels, mle_orig),
columns=["", "MLE"]).style.hide_index()
import emcee
We only need to provide the sampler with the logarithm of the posterior distribution. For clarity we split up its computation in log-prior and log-likelihood, although for a more efficient implementation it should all be in one function.
# -- Log-posterior model
def log_prior(theta):
"""Global parameters (for efficient parallelization): X, b0, g, eta"""
assert len(theta) % 2 == 0
n, N = X.shape
p = (len(theta) - 2)//2
grid = np.linspace(0., 1., N)
beta = theta[:p]
tau = theta[p:2*p]
alpha0 = theta[-2]
log_sigma = theta[-1]
# Transform variables
b = beta - b0
sigma = np.exp(log_sigma)
# Impose constraints on parameters
if (tau < 0.0).any() or (tau > 1.0).any():
return -np.inf
# Compute and regularize G_tau
idx = np.abs(grid - tau[:, np.newaxis]).argmin(1)
X_tau = X[:, idx]
G_tau = X_tau.T@X_tau
G_tau = (G_tau + G_tau.T)/2. # Enforce symmetry
G_tau_reg = G_tau + eta * \
np.max(np.linalg.eigvalsh(G_tau))*np.identity(p)
# Compute log-prior
log_prior = (0.5*utils.logdet(G_tau_reg)
- (p + 2)*log_sigma
- b.T@G_tau_reg@b/(2*g*sigma**2))
return log_prior
def log_likelihood(theta, Y):
"""Global parameters (for efficient parallelization): X"""
return -neg_ll(theta, X, Y)
def log_posterior(theta, Y):
"""Global parameters (for efficient parallelization): X, rng, return_pps"""
lp = log_prior(theta)
if not np.isfinite(lp):
if return_pps:
return -np.inf, np.full_like(Y, -np.inf)
else:
return -np.inf
ll = log_likelihood(theta, Y)
lpos = lp + ll
if return_pps:
theta_orig = np.copy(theta)
theta_orig[-1] = np.exp(theta_orig[-1])**2
pps = utils.generate_response(X, theta_orig, rng=rng)
return lpos, pps
else:
return lpos
We set up the initial points of the chains to be in a random neighbourhood around the MLE to increase the speed of convergence.
# -- Sampler parameters
n_walkers = 100
n_iter_initial = 500
n_iter = 1000
return_pps = True
# Start every walker in a (random) neighbourhood around the MLE
p0 = utils.intial_guess_around_value(
mle_theta, n_walkers=n_walkers, rng=rng)
b0 = mle_theta[:p_hat] # <-- Change if needed
# -- Run sampler
with Pool(N_CORES) as pool:
print(
f"-- Running affine-invariant ensemble sampler with {N_CORES} cores --")
sampler = emcee.EnsembleSampler(
n_walkers, theta_ndim, log_posterior, pool=pool, args=(Y,))
print("Tuning phase...")
state = sampler.run_mcmc(
p0, n_iter_initial, progress='notebook',
store=False)
sampler.reset()
print("MCMC sampling...")
sampler.run_mcmc(state, n_iter, progress='notebook')
We analyze the samples of all chains, discarding a few times the integrated autocorrelation times worth of samples. We could also perform thinning and take only every $k$-th value.
# -- Sampler statistics and trace (with burn-in and thinning)
# Analyze autocorrelation and set burn-in and thinning values
autocorr = sampler.get_autocorr_time(quiet=True)
max_autocorr = np.max(autocorr)
burn = int(3*max_autocorr)
thin = 1
# Get trace of samples
trace = np.copy(sampler.get_chain(discard=burn, thin=thin))
trace[:, :, -1] = np.exp(trace[:, :, -1])**2 # Recover sigma^2
trace_flat = trace.reshape(-1, trace.shape[-1]) # All chains combined
# Get InferenceData object
idata_emcee = utils.emcee_to_idata(
sampler, p_hat, theta_names, theta_names_aux[1:],
burn, thin, return_pps)
# Update and show autocorrelation
autocorr_thin = sampler.get_autocorr_time(discard=burn, thin=thin, quiet=True)
print(
f"Mean acceptance fraction: {100*np.mean(sampler.acceptance_fraction):.3f}%")
pd.DataFrame(
zip(theta_labels, autocorr_thin, len(trace_flat)/autocorr_thin),
columns=["", "Autocorrelation times", "Effective i.i.d samples"]
).style.hide_index()
utils.summary(idata_emcee, var_names=theta_names,
kind="stats", labeller=theta_labeller)
az.plot_trace(idata_emcee, labeller=theta_labeller,
combined=True, var_names=theta_names)
print("Combined density and trace plot:")
az.plot_posterior(idata_emcee, labeller=theta_labeller, point_estimate='mode',
grid=(NROWS(theta_ndim), NCOLS), textsize=20,
var_names=theta_names)
print("Marginal posterior distributions:")
# -- Generate and plot posterior predictive samples from X
if "posterior_predictive" not in idata_emcee:
ppc = utils.generate_ppc(idata_emcee, X, theta_names, rng=rng)
idata_ppc = utils.ppc_to_idata(ppc, idata_emcee, "y_rec")
else:
idata_ppc = idata_emcee
utils.plot_ppc(idata_ppc, n_samples=500, data_pairs={'y_obs': 'y_rec'})
az.plot_autocorr(idata_emcee, combined=True, var_names=theta_names,
grid=(NROWS(theta_ndim), NCOLS), labeller=theta_labeller)
print("Combined autocorrelation times:")
# -- Generate and plot posterior predictive samples from X_test
ppc = utils.generate_ppc(idata_emcee, X_test, theta_names, rng=rng)
idata_ppc = utils.ppc_to_idata(ppc, idata_emcee, "y_pred", y_obs=Y_test)
utils.plot_ppc(idata_ppc, n_samples=500, data_pairs={'y_obs': 'y_pred'})
# -- Compute MSE using several point estimates
point_estimates = ["mode", "mean", "median"]
metrics_emcee = pd.DataFrame(columns=["Point estimate", "MSE", r"$R^2$"])
for pe in point_estimates:
Y_hat = utils.point_predict(X_test, idata_emcee,
theta_names, point_estimate=pe,
rng=rng)
metrics = utils.regression_metrics(Y_test, Y_hat)
metrics_emcee.loc[pe] = [pe, metrics["mse"], metrics["r2"]]
metrics_emcee.style.hide_index()
This is only for testing purposes; in a production environment one should use the Backends feature of emcee.
with open("emcee-p-fixed.idata", 'wb') as file:
pickle.dump(idata_emcee, file)
with open("emcee-p-fixed.idata", 'rb') as file:
idata_emcee = pickle.load(file)
trace = idata_emcee.posterior.to_array().to_numpy().T
trace_flat = trace.reshape(-1, trace.shape[-1]) # All chains combined
import pymc3 as pm
import theano
import theano.tensor as tt
# -- Probabilistic model
def make_model(p, g, eta, X, Y, names, names_aux, mle_theta=None):
n, N = X.shape
grid = np.linspace(0., 1., N)
if mle_theta is not None:
b0 = mle_theta[:p]
else:
b0 = g*rng.standard_normal(size=p) # <-- Change if needed
with pm.Model() as model:
X_pm = pm.Data('X', X)
alpha0_and_log_sigma = pm.DensityDist(
names_aux[0], lambda x: 0, shape=(2,))
alpha0 = pm.Deterministic(names[-2], alpha0_and_log_sigma[0])
log_sigma = alpha0_and_log_sigma[1]
sigma = pm.math.exp(log_sigma)
sigma2 = pm.Deterministic(names[-1], sigma**2)
tau = pm.Uniform(names[1], 0.0, 1.0, shape=(p,))
idx = np.abs(grid - tau[:, np.newaxis]).argmin(1)
X_tau = X_pm[:, idx]
G_tau = pm.math.matrix_dot(X_tau.T, X_tau)
G_tau = (G_tau + G_tau.T)/2. # Enforce symmetry
G_tau_reg = G_tau + eta * \
tt.max(tt.nlinalg.eigh(G_tau)[0])*np.identity(p)
def beta_lprior(x):
b = x - b0
return (0.5*pm.math.logdet(G_tau_reg)
- p*log_sigma
- pm.math.matrix_dot(b.T, G_tau_reg, b)/(2.*g*sigma2))
beta = pm.DensityDist(names[0], beta_lprior, shape=(p,))
expected_obs = alpha0 + pm.math.matrix_dot(X_tau, beta)
y_obs = pm.Normal('y_obs', mu=expected_obs, sigma=sigma, observed=Y)
return model
# -- NUTS with MLE as starting point
model_mle = make_model(p_hat, g, eta, X, Y, theta_names,
theta_names_aux[:1], mle_theta)
with model_mle:
print("Starting from MLE...")
ttr = model_mle.named_vars[theta_names[1]].transformation
start = {theta_names[0]: mle_theta[:p_hat],
theta_names[1] + "_interval__":
ttr.forward(mle_theta[p_hat:2*p_hat]).eval(),
theta_names_aux[0]: mle_theta[-2:]}
idata_mle = pm.sample(1000, cores=2, tune=1000, start=start,
target_accept=0.8, return_inferencedata=True)
utils.summary(idata_mle, var_names=theta_names,
kind="stats", labeller=theta_labeller)
# -- NUTS with MAP as starting point
model_map = make_model(p_hat, g, eta, X, Y, theta_names,
theta_names_aux[:1], mle_theta)
with model_map:
print("Computing MAP estimate...")
start = pm.find_MAP()
print("Starting from MAP estimate...")
idata_map = pm.sample(1000, cores=2, tune=1000, start=start,
target_accept=0.8, return_inferencedata=True)
utils.summary(idata_map, var_names=theta_names,
kind="stats", labeller=theta_labeller)
# -- NUTS with auto initialization
model_auto = make_model(p_hat, g, eta, X, Y, theta_names,
theta_names_aux[:1], mle_theta)
with model_auto:
idata_auto = pm.sample(1000, cores=2, tune=1000, target_accept=0.8,
return_inferencedata=True)
utils.summary(idata_auto, var_names=theta_names,
kind="stats", labeller=theta_labeller)
Since the tuning iterations already serve as burn-in, we keep the whole trace. In addition, we could consider thinning the samples.
# -- Select and print best model (with burn-in and thinning)
burn = 0
thin = 1
model = model_auto
idata_pymc = idata_auto
idata_pymc = idata_pymc.sel(draw=slice(burn, None, thin))
print("Graphical model:")
pm.model_graph.model_to_graphviz(model)
az.plot_trace(idata_pymc, var_names=theta_names, labeller=theta_labeller)
print("Density and trace plot:")
az.plot_posterior(
idata_pymc, point_estimate='mode',
var_names=theta_names,
labeller=theta_labeller,
textsize=20,
grid=(NROWS(theta_ndim), NCOLS))
print("Marginal posterior distributions:")
# -- Generate and plot posterior predictive samples from X
with model:
print("Generating posterior predictive samples...")
ppc = pm.sample_posterior_predictive(idata_pymc)
idata_pymc.extend(az.from_pymc3(posterior_predictive=ppc))
utils.plot_ppc(idata_pymc, n_samples=500, data_pairs={'y_obs': 'y_obs'})
az.plot_autocorr(idata_pymc, var_names=theta_names,
combined=True, grid=(NROWS(theta_ndim), NCOLS),
labeller=theta_labeller)
print("Combined autocorrelation times:")
First we take a look at the distribution of predictions on a previously unseen dataset.
# -- Generate and plot posterior predictive samples from X_test
model_test = make_model(p_hat, g, eta, X_test, Y_test, theta_names,
theta_names_aux[:1], mle_theta)
with model_test:
print("Generating posterior predictive on hold-out data...")
ppc_test = pm.sample_posterior_predictive(idata_pymc)
utils.plot_ppc(
az.from_pymc3(posterior_predictive=ppc_test, model=model_test),
n_samples=500,
data_pairs={'y_obs': 'y_obs'})
Next we look at the MSE when using several point-estimates for the parameters.
# -- Compute MSE using several point estimates
point_estimates = ["mode", "mean", "median"]
metrics_pymc = pd.DataFrame(columns=["Point estimate", "MSE", r"$R^2$"])
for pe in point_estimates:
Y_hat = utils.point_predict(X_test, idata_pymc,
theta_names, point_estimate=pe,
rng=rng)
metrics = utils.regression_metrics(Y_test, Y_hat)
metrics_pymc.loc[pe] = [pe, metrics["mse"], metrics["r2"]]
metrics_pymc.style.hide_index()
_ = idata_pymc.to_netcdf("pymc-p-fixed.nc")
idata_pymc = az.from_netcdf("pymc-p-fixed.nc")
%load_ext watermark
%watermark -n -u -v -iv -w